package com.atguigu.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class FlinkApiJoinApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1  数据源   分别从kafka的两个主题中 获取流
        String demoLeftTopic="demo_left_topic";
        String demoRightTopic="demo_right_topic";
        env.setParallelism(1);
        FlinkKafkaConsumer<String> leftConsumer = KafkaUtil.getFlinkKafkaConsumer(demoLeftTopic, "api_join");
        FlinkKafkaConsumer<String> rightConsumer = KafkaUtil.getFlinkKafkaConsumer(demoRightTopic, "api_join");

        DataStreamSource<String> leftStream = env.addSource(leftConsumer);
        DataStreamSource<String> rightStream = env.addSource(rightConsumer);
        //2   调整数据流结构 把jsonstring ->jsonObj
        SingleOutputStreamOperator<JSONObject> leftJsonStream = leftStream.map(jsonString -> JSON.parseObject(jsonString));
        SingleOutputStreamOperator<JSONObject> rightJsonStream = rightStream.map(jsonString -> JSON.parseObject(jsonString));

        //3  因为后面要根据时间进行开窗，需要水位线 watermark,从数据中提取ts
        SingleOutputStreamOperator<JSONObject> leftJsonWmStream = leftJsonStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofMillis(2000))
                .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                    @Override
                    public long extractTimestamp(JSONObject jsonObject, long l) {
                        return jsonObject.getLong("ts");
                    }
                })
        );


        SingleOutputStreamOperator<JSONObject> rightJsonWmStream = rightJsonStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofMillis(2000))
                .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                    @Override
                    public long extractTimestamp(JSONObject jsonObject, long l) {
                        return jsonObject.getLong("ts");
                    }
                })
        );

        //4  进行window join (  1 window 2 join) // select xxx from a join b on a.id=b.order_id
//        DataStream<JSONObject> joinedDstream = leftJsonWmStream.join(rightJsonWmStream)
//                .where(jsonObj -> jsonObj.getString("id"))
//                .equalTo(jsonObj -> jsonObj.getString("order_id"))
//                .window(TumblingEventTimeWindows.of(Time.milliseconds(10000)))
//                .apply(new JoinFunction<JSONObject, JSONObject, JSONObject>() {
//                    @Override
//                    public JSONObject join(JSONObject leftJsonObj, JSONObject rightJsonObj) throws Exception {
//                        // 实现如何把两个对象合并为一个对象
//                        JSONObject newJsonObj = new JSONObject();
//                        newJsonObj.put("detail_id", rightJsonObj.getString("id"));
//                        newJsonObj.put("order_id", rightJsonObj.getString("order_id"));
//                        newJsonObj.put("user_id", leftJsonObj.getString("user_id"));
//                        newJsonObj.put("status", leftJsonObj.getString("status"));
//                        newJsonObj.put("sku_id", rightJsonObj.getString("sku_id"));
//                        newJsonObj.put("ts", rightJsonObj.getLong("ts"));
//                        return newJsonObj;
//                    }
//                });

    //    joinedDstream.print();


        // 订单明细
        //   {"id":"991","order_id":"101","amount":200,"ts":1000}
        //   {"id":"992","order_id":"102","amount":400,"ts":1000}
        //   {"id":"993","order_id":"103","amount":100,"ts":13000}
        //
        //   购物券
        //   {"id":"7701","order_detail_id":"991","coupon_id":"3301" ,"ts":1000}
        //   {"id":"7702","order_detail_id":"993","coupon_id":"3302" ,"ts":12000}
//        DataStream<JSONObject> cogroupStream = leftJsonWmStream.coGroup(rightJsonWmStream)
//                .where(jsonObject -> jsonObject.getString("id"))
//                .equalTo(jsonObject -> jsonObject.getString("order_detail_id"))
//                .window(TumblingEventTimeWindows.of(Time.milliseconds(10000)))
//                .apply(new CoGroupFunction<JSONObject, JSONObject, JSONObject>() {
//                    @Override
//                    public void coGroup(Iterable<JSONObject> leftItr, Iterable<JSONObject> rightItr, Collector<JSONObject> collector) throws Exception {
//                        // 实现left join
//                        //  思考： 如何实现full join
//
//                        for (JSONObject leftJsonObject : leftItr) {
//                            boolean ifExistsRight = false;
//                            for (JSONObject rightJsonObject : rightItr) {
//                                ifExistsRight = true;   //如果有右表 则迭代关联右表
//                                leftJsonObject.put("coupon_id", rightJsonObject.getString("coupon_id"));
//                                collector.collect(leftJsonObject);
//                            }
//                            if (!ifExistsRight) {  //如果没有右边 直接输出左边
//                                collector.collect(leftJsonObject);
//                            }
//
//                        }
//
//                    }
//                });
//        cogroupStream.print();



        // interval join
        //订单：
        //{"id":"101","user_id":"u_101","status":"1001", "amount":200,"ts":1000}
        //{"id":"102","user_id":"u_102","status":"1001", "amount":400,"ts":11000}
        //{"id":"103","user_id":"u_103","status":"1001", "amount":600,"ts":15000}
        //
        //支付
        //  {"id":"55","order_id":"101","type":"wx"  ,"ts":1000}
        //  {"id":"56","order_id":"102","type":"ali" ,"ts":12000}

        SingleOutputStreamOperator<JSONObject> intervalJoinedStream = leftJsonWmStream.keyBy(jsonObject -> jsonObject.getString("id"))
                .intervalJoin(rightJsonWmStream.keyBy(jsonObject -> jsonObject.getString("order_id")))
                .between(Time.milliseconds(-50), Time.milliseconds(15000))
                .process(new ProcessJoinFunction<JSONObject, JSONObject, JSONObject>() {
                    @Override
                    public void processElement(JSONObject leftJsonObj, JSONObject rightJsonObj, ProcessJoinFunction<JSONObject, JSONObject, JSONObject>.Context context, Collector<JSONObject> collector) throws Exception {
                        leftJsonObj.put("pay_type", rightJsonObj.getString("type"));
                        leftJsonObj.put("pay_ts", rightJsonObj.getString("ts"));
                        collector.collect(leftJsonObj);
                    }
                });
        intervalJoinedStream.print();

        env.execute();
    }
}
